{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Example of Using a Raw Image Dataset in the Feature Store\n",
    "\n",
    "Images are often stored in binary formats for training machine learning models, such as tfrecords or parquet. However, sometimes it can be useful to store a large image dataset in a folder with one file per image, such as .jpg or .png. \n",
    "\n",
    "This notebook will demonstrate how to create a training dataset with .jpg files in the Hopsworks Feature Store"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Starting Spark application\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<table>\n",
       "<tr><th>ID</th><th>YARN Application ID</th><th>Kind</th><th>State</th><th>Spark UI</th><th>Driver log</th><th>Current session?</th></tr><tr><td>18</td><td>application_1549717352737_0020</td><td>pyspark</td><td>idle</td><td><a target=\"_blank\" href=\"http://hopsworks0.logicalclocks.com:8088/proxy/application_1549717352737_0020/\">Link</a></td><td><a target=\"_blank\" href=\"http://hopsworks0.logicalclocks.com:8042/node/containerlogs/container_e01_1549717352737_0020_01_000001/demo_featurestore_admin000__meb10000\">Link</a></td><td>✔</td></tr></table>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "SparkSession available as 'spark'.\n"
     ]
    }
   ],
   "source": [
    "from hops import featurestore\n",
    "from hops import hdfs"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Step 1: Create a PlaceHolder Training Dataset from the Featurestore Registry\n",
    "\n",
    "As a first step we can create the training dataset from the hopsworks registry UI. This will create the metadata of the training dataset and also create a folder to store the dataset in HDFS. \n",
    "\n",
    "![Feature Store Image Dataset 1](./../images/image_dataset_tutorial_1.png \"Feature Store Image Dataset 1\")\n",
    "\n",
    "![Feature Store Image Dataset 2](./../images/image_dataset_tutorial_2.png \"Feature Store Image Dataset 2\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Step 2: Uploading the Images\n",
    "\n",
    ".jpg or .png files are not designed to be written using big data tools, therefore we recommend that you upload the raw images directly from the Dataset-Service in Hopsworks. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The dataset will be in a folder called `<datasetname>_<version>` inside the dataset containing your training datasets (`<projectname>_<Training_Datasets>`). You can get the path directly from the API:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/demo_featurestore_admin000_Training_Datasets/sample_mnist_1'"
     ]
    }
   ],
   "source": [
    "featurestore.get_training_dataset_path(\"sample_mnist\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "![Feature Store Image Dataset 3](./../images/image_dataset_tutorial_3.png \"Feature Store Image Dataset 3\")\n",
    "\n",
    "![Feature Store Image Dataset 4](./../images/image_dataset_tutorial_4.png \"Feature Store Image Dataset 4\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Step 3: Read the Training Dataset into a Spark Dataframe or a Tensorflow Dataset\n",
    "\n",
    "Images such as .jpg or .png can be read by Spark or Tensorflow from HDFS. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Reading an Image Dataset with Spark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "images_df = featurestore.get_training_dataset(\"sample_mnist\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- image: struct (nullable = true)\n",
      " |    |-- origin: string (nullable = true)\n",
      " |    |-- height: integer (nullable = true)\n",
      " |    |-- width: integer (nullable = true)\n",
      " |    |-- nChannels: integer (nullable = true)\n",
      " |    |-- mode: integer (nullable = true)\n",
      " |    |-- data: binary (nullable = true)"
     ]
    }
   ],
   "source": [
    "images_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+\n",
      "|               image|\n",
      "+--------------------+\n",
      "|[hdfs://10.0.2.15...|\n",
      "|[hdfs://10.0.2.15...|\n",
      "|[hdfs://10.0.2.15...|\n",
      "|[hdfs://10.0.2.15...|\n",
      "|[hdfs://10.0.2.15...|\n",
      "+--------------------+\n",
      "only showing top 5 rows"
     ]
    }
   ],
   "source": [
    "images_df.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Reading an Image Dataset with Tensorflow"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "import numpy as np"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [],
   "source": [
    "def build_tf_graph(images_dir):\n",
    "    \"\"\"\n",
    "    A simple computational graph for reading in a jpg into a tensor\n",
    "    \"\"\"\n",
    "    img_filenames = tf.gfile.Glob(images_dir + \"/*.jpg\")\n",
    "    num_images = len(img_filenames)\n",
    "    img_queue = tf.train.string_input_producer(img_filenames)\n",
    "    img_reader = tf.WholeFileReader()\n",
    "    # Operation for reading a single file from the queue\n",
    "    file_name_op, file_contents_op = img_reader.read(img_queue)\n",
    "    # Operation for decoding JPEG to tensor\n",
    "    img_to_tensor_op = tf.image.decode_jpeg(file_contents_op)\n",
    "    return img_to_tensor_op, file_name_op, num_images"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [],
   "source": [
    "def run_graph_for_reading_images(sess, images_dir, num_images, img_op, file_name_op):\n",
    "    \"\"\"\n",
    "    Run the tf-graph for reading all images into tensors\n",
    "    \"\"\"\n",
    "    image_tensors = []\n",
    "    image_filenames_read = []\n",
    "    for i in range(num_images):\n",
    "        # these two must be run in the same call to sess.run() otherwise they become unsynced which messes up labels for validation set..\n",
    "        img_tensor, file_name_str = sess.run([img_op, file_name_op])\n",
    "        image_tensors.append(img_tensor)\n",
    "        image_filenames_read.append(file_name_str)\n",
    "    return np.array(image_tensors), np.array(image_filenames_read)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [],
   "source": [
    "def init_graph():\n",
    "    \"\"\" \n",
    "    Initialize the graph and variables for Tensorflow engine \n",
    "    \"\"\"\n",
    "    # get operation for initializing the global variables in the graph\n",
    "    init_g = tf.global_variables_initializer()\n",
    "    \n",
    "    # create a session for encapsulating the environment where \n",
    "    # operations can be run and tensors can be evaluated\n",
    "    sess = tf.Session()\n",
    "    \n",
    "    # run the initialization operation\n",
    "    sess.run(init_g)\n",
    "    return sess"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [],
   "source": [
    "def setup_tf():\n",
    "    \"\"\"\n",
    "    Setup TF session\n",
    "    \"\"\"\n",
    "    # Initialize TF\n",
    "    sess = init_graph()\n",
    "\n",
    "    # Get coordinator for threads to be able to read\n",
    "    coord = tf.train.Coordinator()\n",
    "\n",
    "    # Starts all queue runners in the graph and return list of the threads\n",
    "    threads = tf.train.start_queue_runners(coord=coord, sess=sess)\n",
    "    \n",
    "    return sess, coord, threads"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [],
   "source": [
    "images_dir = featurestore.get_training_dataset_path(\"sample_mnist\")\n",
    "img_to_tensor_op, file_name_op, num_images = build_tf_graph(images_dir)\n",
    "sess, coord, threads = setup_tf()\n",
    "image_tensors, image_filenames_read = run_graph_for_reading_images(sess, \n",
    "                                                                   images_dir, \n",
    "                                                                   num_images, \n",
    "                                                                   img_to_tensor_op, \n",
    "                                                                   file_name_op\n",
    "                                                                  )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "array([b'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/demo_featurestore_admin000_Training_Datasets/sample_mnist_1/img_4.jpg',\n",
      "       b'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/demo_featurestore_admin000_Training_Datasets/sample_mnist_1/img_1.jpg',\n",
      "       b'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/demo_featurestore_admin000_Training_Datasets/sample_mnist_1/img_3.jpg',\n",
      "       b'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/demo_featurestore_admin000_Training_Datasets/sample_mnist_1/img_10.jpg',\n",
      "       b'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/demo_featurestore_admin000_Training_Datasets/sample_mnist_1/img_2.jpg',\n",
      "       b'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/demo_featurestore_admin000_Training_Datasets/sample_mnist_1/img_6.jpg',\n",
      "       b'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/demo_featurestore_admin000_Training_Datasets/sample_mnist_1/img_7.jpg',\n",
      "       b'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/demo_featurestore_admin000_Training_Datasets/sample_mnist_1/img_9.jpg',\n",
      "       b'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/demo_featurestore_admin000_Training_Datasets/sample_mnist_1/img_8.jpg',\n",
      "       b'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/demo_featurestore_admin000_Training_Datasets/sample_mnist_1/img_5.jpg'],\n",
      "      dtype='|S128')"
     ]
    }
   ],
   "source": [
    "image_filenames_read"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "(28, 28, 1)"
     ]
    }
   ],
   "source": [
    "image_tensors[0].shape"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 2
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}